package demo.concurrent.executors.myexecutor;

import org.springframework.util.Assert;

import java.util.HashSet;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 　　　　　　　   ┏┓　   ┏┓+ +
 * 　　　　　　　┏┛┻━━━┛┻┓ + +
 * 　　　　　　　┃　　　　　　　┃
 * 　　　　　　　┃　　　━　　　┃ ++ + + +
 * 　　　　　　 ████━████ ┃+
 * 　　　　　　　┃　　　　　　　┃ +
 * 　　　　　　　┃　　　┻　　　┃
 * 　　　　　　　┃　　　　　　　┃ + +
 * 　　　　　　　┗━┓　　　┏━┛
 * 　　　　　　　　　┃　　　┃
 * 　　　　　　　　　┃　　　┃ + + + +
 * 　　　　　　　　　┃　　　┃			God beast body, code no BUG
 * 　　　　　　　　　┃　　　┃ +			神兽护体,代码无BUG
 * 　　　　　　　　　┃　　　┃
 * 　　　　　　　　　┃　　　┃　　+
 * 　　　　　　　　　┃　 　　┗━━━┓ + +
 * 　　　　　　　　　┃ 　　　　　　　┣┓
 * 　　　　　　　　　┃ 　　　　　　　┏┛
 * 　　　　　　　　　┗┓┓┏━┳┓┏┛ + + + +
 * 　　　　　　　　　　┃┫┫　┃┫┫
 * 　　　　　　　　　　┗┻┛　┗┻┛+ + + +
 *
 * @title: 自定义线程池
 * @author zhonghaijun
 * @date 2021-03-02
 */
public class ZThreadPoolExecutor {

    /**
     * 核心线程数
     */
    private Integer coreWorker;

    /**
     * 最大线程数
     */
    private Integer maxWorker;


    /**
     * 工作线程数
     */
    private AtomicInteger workerCount;

    /**
     * 排它锁
     */
    private ReentrantLock mainLock = new ReentrantLock();

    /**
     * 线程池状态
     */
    private AtomicBoolean isShutDown = new AtomicBoolean(false);


    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 线程池
     */
    private HashSet<Worker> workSet;

    /**
     * 计算线程完成的数
     */
    private long completedTaskCount;

    /**
     * 是否允许核心线程超时，如果为true，那么线程池最后会收缩至0
     */
    private boolean allowCoreThreadTimeOut = false;

    /**
     * 设置超时时间
     */
    private long timeOut;


    /**
     * 线程池拒绝策略
     */
    private MyRejectHandler handler;


    public ZThreadPoolExecutor(Integer coreWorker,Integer maxWorker,long timeOut, BlockingQueue<Runnable> taskQueue, MyRejectHandler handler) {
        this.coreWorker = coreWorker;
        this.maxWorker = maxWorker;
        this.workerCount = new AtomicInteger(0);
        this.timeOut = timeOut;
        this.taskQueue = taskQueue;
        this.workSet = new HashSet<>();
        this.handler = handler;
    }

    /**
     * 提交线程任务
     * @param runnable
     */
    public void execute(Runnable runnable){
        Assert.notNull(runnable,"任务不能为空");
        //比较核心线程是否创建完毕
        int workerCount = this.workerCount.get();
        if(workerCount < coreWorker){
            //创建核心线程
            addWorker(runnable,true);
            return;
        }
        boolean status = this.isShutDown.get();
        //状态为运行中，并且添加到等待队列成功
        if(!status && this.taskQueue.offer(runnable)){
            System.out.println("任务入队。。");
            status = this.isShutDown.get();
            //如果状态是已经关闭，移除任务
            if(status && this.taskQueue.remove(runnable)){
                //调用拒绝策略
                handler.rejectHandler(runnable,this);
            }
            //如果获取到的工作线程数为0，那么就创建一个默认的
            workerCount = this.workerCount.get();
            if(workerCount == 0){
                addWorker(null,false);
            }
        }else if(!addWorker(runnable,false)){
            //添加失败调用拒绝策略
            handler.rejectHandler(runnable,this);
        }

    }

    private Boolean addWorker(Runnable firstTask,Boolean core){
        //retry 使用break retry跳过当前标记下继续循环，continue retry表示跳到当前标记下继续循环
        retry:
        for (;;){
            boolean status = isShutDown.get();
            if(status){
                //判断运行状态
                System.out.println("线程池已经关闭，提交任务失败。。");
                return false;
            }
            for (;;){
                int workerCount = this.workerCount.get();
                //每次添加worker都要判断工作线程数是否要大于核心线程数或者最大线程数
                //如果大于核心线程数，返回false，重新创建普通线程；如果大于最大线程数，返回false调用拒绝策略
                if(workerCount >= (core ? coreWorker:maxWorker)){
                    return false;
                }
                //比较交换，将工作线程数加一
                if(this.workerCount.compareAndSet(workerCount,workerCount+1)){
                    System.out.println("工作线程数："+this.workerCount.get());
                    break retry;
                }
                if(isShutDown.get() != status){
                    continue retry;
                }
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker worker = null;
        try {
            worker = new Worker(firstTask);
            //获取到工作线程本身
            Thread thread = worker.thread;
            if(thread != null){
                try {
                    //将创建的Worker添加到工作队列中，涉及到对数组的操作，需要加锁
                    this.mainLock.lock();
                    boolean status = this.isShutDown.get();
                    //判断状态是否已经关闭
                    if(!status){
                        //判断当前线程是否已经启动，如果启动了就抛出异常
                        if(thread.isAlive()){
                            throw new IllegalAccessException(thread.getName()+"线程已经启动，不能再次启动");
                        }
                        //添加到工作队列中
                        workSet.add(worker);
                        workerAdded = true;
                    }
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } finally {
                    this.mainLock.unlock();
                }
                //如果被添加成功了就启动线程
                if(workerAdded){
                    //启动工作线程
                    System.out.println("启动工作线程。。");
                    thread.start();
                    workerStarted = true;
                }
            }
        } finally {
            if(!workerAdded){
                System.out.println("工作线程添加失败");
                addWorkerFailed(worker);
            }
        }
        return workerStarted;
    }

    /**
     * 添加工作队列失败，删除掉，并且将工作线程数减一
     * @param worker
     */
    private void addWorkerFailed(Worker worker){
        try {
            this.mainLock.lock();
            if(worker != null){
                this.workSet.remove(worker);
                //执行比较交换，减去工作线程
                doDecrementWorkerCount();
            }
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 真正worker执行的方法
     * @param worker
     */
    private void runWorker(Worker worker) {
        Thread thread = Thread.currentThread();
        //获取到第一次的任务
        Runnable firstTask = worker.firstTask;
        //清空
        worker.firstTask = null;
        boolean completedAbruptly = true;
        try {
            //先判断第一次触发的任务是否为空，如果为空则去队列中获取任务，当前是while循环会一直去任务队列中获取数据
            while (firstTask != null || (firstTask = getTask()) != null){
                worker.lock();
                boolean status = this.isShutDown.get();
                //判断线程池是否已经关闭，并且线程是否已经中断，如果未中断就调用interrupt设置中断标记
                //isInterrupted()判断当前线程是否被标记为中断，但是不会清楚标记；interrupted()判断是否标记为中断，并且会清除标记
                if(status && !thread.isInterrupted()){
                    System.out.println("线程池获取失败，中断任务。。");
                    thread.interrupt();
                }
                try {
                    //直接调用task任务的run方法执行
                    firstTask.run();
                } catch (Exception e){
                    throw new RuntimeException(e);
                } finally {
                    //清空工作线程中的任务
                    firstTask = null;
                    worker.finishTaskNum ++;
                    worker.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(worker,completedAbruptly);
        }
    }

    private void processWorkerExit(Worker worker, Boolean completedAbruptly) {
        //如果任务不是正常的退出，减去工作线程
        if(completedAbruptly){
            doDecrementWorkerCount();
        }
        this.mainLock.lock();
        try {
            //线程池记录总共完成的任务数量
            completedTaskCount += worker.finishTaskNum;
            //移除掉worker
            workSet.remove(worker);
        } finally {
            this.mainLock.unlock();
        }
        //如果线程池没有关闭，并且队列不为空时，创建新的工作线程
        boolean status = this.isShutDown.get();
        if(status){
            System.out.println("线程池已经关闭，停止创建工作线程，工作线程数："+this.workerCount.get());
        }
        if(!status){
            if(!completedAbruptly){
                //如果允许核心线程超时，那么线程池存在的线程最小值为0 否则最小值就是核心线程数
                int min = allowCoreThreadTimeOut ? 0 : coreWorker;
                //如果最小值为0，并且任务队列中还有任务存在，那么线程池数量保持为1
                if(min == 0 && !taskQueue.isEmpty()){
                    min = 1;
                }
                //如果工作线程还大于最小值的话，直接返回不用创建
                if(this.workerCount.get() >= min){
                    return;
                }
            }
            addWorker(null,false);
        }
    }

    /**
     * 任务队列中获取任务
     * @return
     */
    private Runnable getTask() {
        boolean isTimeout = false;
        for (;;){
            boolean status = this.isShutDown.get();
            if(status){
                System.out.println("线程池已经关闭，停止获取任务");
                return null;
            }
            try {
                //是否允许核心线程超时，工作线程是否大于了核心线程
                boolean timed = allowCoreThreadTimeOut || this.workerCount.get() > coreWorker;

                //工作线程进行收缩，如果允许核心线程超时，那么收缩时核心线程也会删除掉
                if((this.workerCount.get() > maxWorker || (isTimeout && timed)) && (this.workerCount.get() > 1 || taskQueue.isEmpty())){
                    if(this.workerCount.compareAndSet(this.workerCount.get(),this.workerCount.get() - 1)){
                        System.out.println("线程池进行收缩，收缩后工作线程："+ this.workerCount.get());
                        return null;
                    }
                    continue;
                }
                //take会阻塞并且释放cpu资源，poll则会根据等待的时间来一直阻塞但是不会释放cpu资源
                Runnable r = timed ? taskQueue.poll(timeOut, TimeUnit.SECONDS) : taskQueue.take();
                if(r != null){
                    return r;
                }
                isTimeout = true;
            } catch (InterruptedException e) {

            }
        }
    }

    public void shutdownNow(){
        while (!this.isShutDown.compareAndSet(this.isShutDown.get(),Boolean.TRUE)){
            break;
        }
    }

    private void doDecrementWorkerCount(){
        do {}while (!this.workerCount.compareAndSet(workerCount.get(),workerCount.get() - 1));
    }

    private void doIncrementWorkerCount(){
        do {}while (!this.workerCount.compareAndSet(workerCount.get(),workerCount.get() + 1));
    }



    private final class Worker extends ReentrantLock implements Runnable{

        /**
         * 第一次触发创建时的任务
         */
        private Runnable firstTask;

        /**
         * 工作线程，worker运行的线程
         */
        private Thread thread;

        /**
         * 已经处理了的任务
         */
        private volatile long finishTaskNum;

        public Worker(Runnable firstTask){
            this.firstTask = firstTask;
            //将worker本身包装进thread线程中
            this.thread = new Thread(this,"zhj_"+ UUID.randomUUID().toString());
        }




        @Override
        public void run() {
            //调用Executors中定义的runWorker方法
            runWorker(this);
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            Worker worker = (Worker) o;
            return Objects.equals(firstTask, worker.firstTask) &&
                    Objects.equals(thread, worker.thread) &&
                    Objects.equals(finishTaskNum, worker.finishTaskNum);
        }

        @Override
        public int hashCode() {
            return Objects.hash(firstTask, thread, finishTaskNum);
        }
    }
}
